use crate::CubeError;
use cubedatasketches::{HLLDataSketch, HLLUnionDataSketch};
use cubehll::HllSketch;
use cubezetasketch::HyperLogLogPlusPlus;

#[derive(Debug)]
pub enum Hll {
    Airlift(HllSketch),              // Compatible with Athena, Presto, etc.
    ZetaSketch(HyperLogLogPlusPlus), // Compatible with BigQuery.
    DataSketches(HLLDataSketch),     // Compatible with DataBricks
}

const DS_LIST_PREINTS: u8 = 2;
const DS_HASH_SET_PREINTS: u8 = 3;
const DS_HLL_PREINTS: u8 = 10;
const DS_SER_VER: u8 = 1;
const DS_FAMILY_ID: u8 = 7;

impl Hll {
    pub fn read(data: &[u8]) -> Result<Hll, CubeError> {
        if data.is_empty() {
            return Err(CubeError::internal(
                "invalid serialized HLL (empty data)".to_string(),
            ));
        }

        //  - must larger than 3 due to how protos are encoded in ZetaSketch.
        //  - represents the data format version and is <= 3 in AirLift.
        // -  checking first 3 bytes for figure out HLL from Apache DataSketches
        if (data[0] == DS_LIST_PREINTS
            || data[0] == DS_HASH_SET_PREINTS
            || data[0] == DS_HLL_PREINTS)
            && data[1] == DS_SER_VER
            && data[2] == DS_FAMILY_ID
        {
            return Ok(Hll::DataSketches(HLLDataSketch::read(data)?));
        } else if data[0] <= 3 {
            return Ok(Hll::Airlift(HllSketch::read(data)?));
        } else {
            return Ok(Hll::ZetaSketch(HyperLogLogPlusPlus::read(data)?));
        }
    }

    pub fn write(&self) -> Vec<u8> {
        match self {
            Self::Airlift(h) => h.write(),
            Self::ZetaSketch(h) => h.write(),
            Self::DataSketches(h) => h.write(),
        }
    }

    pub fn cardinality(&mut self) -> u64 {
        match self {
            Hll::Airlift(h) => h.cardinality(),
            Hll::ZetaSketch(h) => h.cardinality(),
            Hll::DataSketches(h) => h.cardinality(),
        }
    }
}

#[derive(Debug)]
pub enum HllUnion {
    Airlift(HllSketch),
    ZetaSketch(HyperLogLogPlusPlus),
    DataSketches(HLLUnionDataSketch),
}

impl HllUnion {
    pub fn new(hll: Hll) -> Result<Self, CubeError> {
        match hll {
            Hll::Airlift(h) => Ok(Self::Airlift(h)),
            Hll::ZetaSketch(h) => Ok(Self::ZetaSketch(h)),
            Hll::DataSketches(h) => {
                let mut union = HLLUnionDataSketch::new(h.get_lg_config_k())?;
                union.merge_with(h)?;

                Ok(Self::DataSketches(union))
            }
        }
    }

    pub fn write(&self) -> Vec<u8> {
        match self {
            Self::Airlift(h) => h.write(),
            Self::ZetaSketch(h) => h.write(),
            Self::DataSketches(h) => h.write(),
        }
    }

    pub fn is_compatible(&self, other: &Hll) -> bool {
        match (self, other) {
            (Self::Airlift(l), Hll::Airlift(r)) => l.index_bit_len() == r.index_bit_len(),
            (Self::ZetaSketch(l), Hll::ZetaSketch(r)) => l.is_compatible(r),
            (Self::DataSketches(l), Hll::DataSketches(r)) => {
                l.get_lg_config_k() == r.get_lg_config_k()
            }
            _ => return false,
        }
    }

    /// Clients are responsible for calling `is_compatible` before running this function.
    /// On error, `self` may end up in inconsistent state and must be discarded.
    pub fn merge_with(&mut self, other: Hll) -> Result<(), CubeError> {
        debug_assert!(self.is_compatible(&other));

        match (self, other) {
            (Self::Airlift(l), Hll::Airlift(r)) => l.merge_with(&r),
            (Self::ZetaSketch(l), Hll::ZetaSketch(r)) => l.merge_with(&r)?,
            (Self::DataSketches(l), Hll::DataSketches(r)) => l.merge_with(r)?,
            _ => return Err(CubeError::internal("incompatible HLL types".to_string())),
        }

        return Ok(());
    }
}

#[cfg(test)]
mod test {
    use super::*;

    #[test]
    fn hll_detect_sketches_success() -> Result<(), CubeError> {
        {
            let mut sketch = Hll::read(&hex::decode(&"0201070C03080108320B1F05")?)?;
            assert!(matches!(sketch, Hll::DataSketches(_)));
            assert_eq!(sketch.cardinality(), 1);
        }

        {
            let mut sketch = Hll::read(&hex::decode(&"0301070C050800010800000064987A05A5456B06E9EBD51A4C116B08307C4E047258E51293723306176C6E06")?)?;
            assert!(matches!(sketch, Hll::DataSketches(_)));
            assert_eq!(sketch.cardinality(), 8);
        }

        {
            let mut sketch = Hll::read(&hex::decode(&"0A01070C0008000274799646F46B824000000040101BAD400000000000000000DC0D0000000000000000000600000001000100000000200000000000100100000000000000000000000000000000000000007000000000000000010000030000500000000050000000000002200003202000002000040000000000001003000010000000020000000000030000000000000000010000000000000010002206000000500000000001000000130000000003000000000020000000270000020000002000201000000100000000010001090000020000002000010120000000000000000000000000000010000004000000001000000010000000000000000000700000000000000000000100000001000011000000000000000000000000100000000000000010001000000000001002000000000100000000000000000000000004000000000000000000002000000000011000000000002020000000050000000000000000000010000000000000000001100050000000000220000000000000000000000010300000041000000000000000108000000000104000000001303410010001000002000000000000000000000001002000000002020000030001000013000000000010000000000010020000000005100000000020000000900000000000003100100000000000020000000000000000100000000000000004000200300000001000001110000000000000000000103005010020060000010000000000010000010000010002000000000000000010100000000000010001000003000000000030020110000000000000010020000000000000000100000000000000010003000005002000000400000000000000100200100000000000100000300000000000100000300000000000010200000000002000502000020000000000000000000000303000000000000000000000040000000040000000000020000010001000003010005100002001000000000000000000000000003000000000103000005000012000100000010B00000000000001030020000000000000000000020100002003001000000000002000000000210410000001000050000000000120000000000010000001000202000100000001002000000200001000000100100000050200000000000030001000020000001002402230001000010010010000110200010001100000000100001010000100010000000100000000100040020000106000020000000000000000001001000000000000000101000000010000131000000000000010010000000400000000000000010001000000000000000000000000000000000000001302001000020000000000001300000000000000000000000002000000200000000000004000101000100000000000000300000000020000000000100001000001000000330000000000000000000400000400000000000000003000000000000000000000000000000000100000000100000000000000000001000000000020010000000000020000524000020701000000100000000000001000000000020000000021000002210000030003000000000000000000000000001010000010101100000100030000001101000000031100010000000000000000000000032000010000000000000200010000000000000000020000030010301000000030001000000000000000100000000040000004001002200012020000000000000110000000000020000001000000000000000000000000000000021000022000000000000000100000000000000010000000010000000000000020000000000100001002000000000400000000000203010000000022000000000010000000000004000100000000010000000000000012000000010200000000000002010200000204000000002300000100002010000000000000000000000000200000000002020002000000001000000100000001000000000010000000001000104000400000600001000000100000000000000000003000100000150000010020000031000000000000000000003012002000001000000100000000000300030000000401000000000000000020010000000000000102000000000000000001000000000000000500002000000000300502000000020000010000000000000011000000000000000003000000000000010000000000020000100000100001000000000000000010000000020000001020000000000000000000001000000010002002000000000000300000000020000000000000000000040000000000000010221000000001000006000000000000000000010020010001000000401000000000000030000000000001000000110000000000000120000000000000010000400300010000010100000001002000001000000600001000000000000000000021001000000003000000000000000000002000020000000500000000000010000050040002000000000000000000000000100000000004000000030200000000050000100000000000000300000001020000000000010000000300000000000003000000500000010000000000000000000000000000000300000000000000004000000000000000520000000000000000010000002000200000000300200020000002000000000000002000000000100000000000000050000100111700000010000000000000000000000030001000000002000000000000002100000000000000000000000000000100000004300000000003000000000303001000000300300000002000001000013010000000002000000000000000000010020000400000100000001000100300000000000000105300000000000000000000000000000010303000")?)?;
            assert!(matches!(sketch, Hll::DataSketches(_)));
            assert_eq!(sketch.cardinality(), 589);
        }

        Ok(())
    }
}
